{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "# Azure OpenAI for Big Data\n",
    "\n",
    "The Azure OpenAI service can be used to solve a large number of natural language tasks through prompting the completion API. To make it easier to scale your prompting workflows from a few examples to large datasets of examples we have integrated the Azure OpenAI service with the distributed machine learning library [SynapseML](https://www.microsoft.com/en-us/research/blog/synapseml-a-simple-multilingual-and-massively-parallel-machine-learning-library/). This integration makes it easy to use the [Apache Spark](https://spark.apache.org/) distributed computing framework to process millions of prompts with the OpenAI service. This tutorial shows how to apply large language models at a distributed scale using Azure Open AI and Azure Synapse Analytics. \n",
    "\n",
    "## Step 1: Prerequisites\n",
    "\n",
    "The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but an Azure Databricks, HDInsight, or Spark on Kubernetes, or even a python environment with the `pyspark` package will work. \n",
    "\n",
    "1. An Azure OpenAI resource – request access [here](https://customervoice.microsoft.com/Pages/ResponsePage.aspx?id=v4j5cvGGr0GRqy180BHbR7en2Ais5pxKtso_Pz4b1_xUOFA5Qk1UWDRBMjg0WFhPMkIzTzhKQ1dWNyQlQCN0PWcu) before [creating a resource](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource)\n",
    "1. [Create a Synapse workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-create-workspace)\n",
    "1. [Create a serverless Apache Spark pool](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-analyze-spark#create-a-serverless-apache-spark-pool)\n",
    "\n",
    "\n",
    "## Step 2: Import this guide as a notebook\n",
    "\n",
    "The next step is to add this code into your Spark cluster. You can either create a notebook in your Spark platform and copy the code into this notebook to run the demo. Or download the notebook and import it into Synapse Analytics\n",
    "\n",
    "1.\t[Download this demo as a notebook](https://github.com/microsoft/SynapseML/blob/master/notebooks/features/cognitive_services/CognitiveServices%20-%20OpenAI.ipynb) (click Raw, then save the file)\n",
    "1.\tImport the notebook [into the Synapse Workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-development-using-notebooks#create-a-notebook) or if using Databricks [into the Databricks Workspace](https://docs.microsoft.com/en-us/azure/databricks/notebooks/notebooks-manage#create-a-notebook)\n",
    "1. Install SynapseML on your cluster. Please see the installation instructions for Synapse at the bottom of [the SynapseML website](https://microsoft.github.io/SynapseML/). Note that this requires pasting an additional cell at the top of the notebook you just imported\n",
    "3.\tConnect your notebook to a cluster and follow along, editing and rnnung the cells below.\n",
    "\n",
    "## Step 3: Fill in your service information\n",
    "\n",
    "Next, please edit the cell in the notebook to point to your service. In particular set the `service_name`, `deployment_name`, `location`, and `key` variables to match those for your OpenAI service:"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "dd4c8776-6853-4257-bef8-72778724ad57"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "import os\n",
    "from pyspark.sql import SparkSession\n",
    "from synapse.ml.core.platform import running_on_synapse, find_secret\n",
    "\n",
    "# Bootstrap Spark Session\n",
    "spark = SparkSession.builder.getOrCreate()\n",
    "if running_on_synapse():\n",
    "    from notebookutils.visualization import display\n",
    "\n",
    "# Fill in the following lines with your service information\n",
    "service_name = \"synapseml-openai\"\n",
    "deployment_name = \"text-davinci-001\"\n",
    "key = find_secret(\"openai-api-key\")  # please replace this with your key as a string\n",
    "\n",
    "assert key is not None and service_name is not None"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "1b0db8af-7fe2-40bc-9df4-cc7f274d53f0"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "## Step 4: Create a dataset of prompts\n\nNext, create a dataframe consisting of a series of rows, with one prompt per row. \n\nYou can also load data directly from ADLS or other databases. For more information on loading and preparing Spark dataframes, see the [Apache Spark data loading guide](https://spark.apache.org/docs/latest/sql-data-sources.html)."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "76f069b7-14e8-44ea-97f0-1c49cf02eeed"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "df = spark.createDataFrame(\n    [\n        (\"Hello my name is\",),\n        (\"The best code is code thats\",),\n        (\"SynapseML is \",),\n    ]\n).toDF(\"prompt\")"
   ],
   "metadata": {
    "jupyter": {
     "source_hidden": false,
     "outputs_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    },
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "1a4366df-4d40-45a0-b1a7-6086b9c693d2"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "## Step 5: Create the OpenAICompletion Apache Spark Client\n\nTo apply the OpenAI Completion service to your dataframe you just created, create an OpenAICompletion object which serves as a destributed client. Parameters of the service can be set either with a single value, or by a column of the dataframe with the appropriate setters on the `OpenAICompletion` object. Here we are setting `maxTokens` to 200. A token is around 4 characters, and this limit applies to the sum of the prompt and the result. We are also setting the `promptCol` parameter with the name of the prompt column in the dataframe."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "fb48578c-e4b3-49fc-9ee2-2f8ae8808c19"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "from synapse.ml.cognitive import OpenAICompletion\n\ncompletion = (\n    OpenAICompletion()\n    .setSubscriptionKey(key)\n    .setDeploymentName(deployment_name)\n    .setUrl(\"https://{}.openai.azure.com/\".format(service_name))\n    .setMaxTokens(200)\n    .setPromptCol(\"prompt\")\n    .setErrorCol(\"error\")\n    .setOutputCol(\"completions\")\n)"
   ],
   "metadata": {
    "jupyter": {
     "source_hidden": false,
     "outputs_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    },
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "2dca7a9d-6092-48af-8653-10c141fd440d"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "## Step 5: Transform the dataframe with the OpenAICompletion Client\n\nNow that you have the dataframe and the completion client, you can transform your input dataset and add a column called `completions` with all of the information the service adds. We will select out just the text for simplicity."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "64b2a454-65ab-45ec-9946-d92539899781"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "from pyspark.sql.functions import col\n",
    "\n",
    "completed_df = completion.transform(df).cache()\n",
    "display(\n",
    "    completed_df.select(\n",
    "        col(\"prompt\"),\n",
    "        col(\"error\"),\n",
    "        col(\"completions.choices.text\").getItem(0).alias(\"text\"),\n",
    "    )\n",
    ")"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "90684f9f-5c74-463f-a809-75a219489d7f"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "Your output should look something like this. Please note completion text will be different\n\n| **prompt**                 \t| **error** \t| **text**                                                                                                                              \t|\n|-----------------------------\t|-----------\t|---------------------------------------------------------------------------------------------------------------------------------------\t|\n| Hello my name is            \t| null      \t| Makaveli I'm eighteen years old and I want to   be a rapper when I grow up I love writing and making music I'm from Los   Angeles, CA \t|\n| The best code is code thats \t| null      \t| understandable This is a subjective statement,   and there is no definitive answer.                                                   \t|\n| SynapseML is                \t| null      \t| A machine learning algorithm that is able to learn how to predict the future outcome of events.                                       \t|"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "d217c822-a213-43b3-aeba-2653e52b3421"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "## Additional Usage Examples\n\n### Improve throughput with request batching \n\nThe example above makes several requests to the service, one for each prompt. To complete multiple prompts in a single request, use batch mode. First, in the OpenAICompletion object, instead of setting the Prompt column to \"Prompt\", specify \"batchPrompt\" for the BatchPrompt column.\nTo do so, create a dataframe with a list of prompts per row.\n\n**Note** that as of this writing there is currently a limit of 20 prompts in a single request, as well as a hard limit of 2048 \"tokens\", or approximately 1500 words."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "9111da73-81f2-48e5-9a0c-eb65e1567f91"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "batch_df = spark.createDataFrame(\n    [\n        ([\"The time has come\", \"Pleased to\", \"Today stocks\", \"Here's to\"],),\n        ([\"The only thing\", \"Ask not what\", \"Every litter\", \"I am\"],),\n    ]\n).toDF(\"batchPrompt\")"
   ],
   "metadata": {
    "jupyter": {
     "source_hidden": false,
     "outputs_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    },
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "9f9b7953-6d96-4f83-b61d-c396cefb28ea"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "Next we create the OpenAICompletion object. Rather than setting the prompt column, set the batchPrompt column if your column is of type `Array[String]`."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "0bb5daf9-8155-460c-b2dd-e1ca302a3776"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "batch_completion = (\n    OpenAICompletion()\n    .setSubscriptionKey(key)\n    .setDeploymentName(deployment_name)\n    .setUrl(\"https://{}.openai.azure.com/\".format(service_name))\n    .setMaxTokens(200)\n    .setBatchPromptCol(\"batchPrompt\")\n    .setErrorCol(\"error\")\n    .setOutputCol(\"completions\")\n)"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "8411e5ba-7f22-4ac9-a78e-1746a7ccc8bc"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "In the call to transform a request will then be made per row. Since there are multiple prompts in a single row, each request will be sent with all prompts in that row. The results will contain a row for each row in the request."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "be2a0d15-40e1-4a3d-a879-d7d0e0129b35"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "completed_batch_df = batch_completion.transform(batch_df).cache()\ndisplay(completed_batch_df)"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "a6fb7509-f582-47bd-8b57-f59d51c03eb3"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "### Using an automatic minibatcher\n\nIf your data is in column format, you can transpose it to row format using SynapseML's `FixedMiniBatcherTransformer`."
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "2dd7259b-173a-41ef-b98e-7b1dc0df875f"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "from pyspark.sql.types import StringType\n",
    "from synapse.ml.stages import FixedMiniBatchTransformer\n",
    "from synapse.ml.core.spark import FluentAPI\n",
    "\n",
    "completed_autobatch_df = (\n",
    "    df.coalesce(\n",
    "        1\n",
    "    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets\n",
    "    .mlTransform(FixedMiniBatchTransformer(batchSize=4))\n",
    "    .withColumnRenamed(\"prompt\", \"batchPrompt\")\n",
    "    .mlTransform(batch_completion)\n",
    ")\n",
    "\n",
    "display(completed_autobatch_df)"
   ],
   "metadata": {
    "jupyter": {
     "source_hidden": false,
     "outputs_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    },
    "collapsed": false,
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "04212778-8002-4e30-bf31-b7511c5776fd"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "### Prompt engineering for translation\n\nThe Azure OpenAI service can solve many different natural language tasks through [prompt engineering](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/completions). Here we show an example of prompting for language translation:"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "f1611cd5-1af9-458f-a1d5-b1f517194cc8"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "translate_df = spark.createDataFrame(\n",
    "    [\n",
    "        (\"Japanese: Ookina hako \\nEnglish: Big box \\nJapanese: Midori tako\\nEnglish:\",),\n",
    "        (\n",
    "            \"French: Quel heure et il au Montreal? \\nEnglish: What time is it in Montreal? \\nFrench: Ou est le poulet? \\nEnglish:\",\n",
    "        ),\n",
    "    ]\n",
    ").toDF(\"prompt\")\n",
    "\n",
    "display(completion.transform(translate_df))"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "0cee629b-240d-411f-a067-9f7b9ac6ce5d"
    }
   },
   "outputs": [],
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "### Prompt for question answering\n\nHere, we prompt GPT-3 for general-knowledge guestion answering:"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "3a726683-8e19-4ebf-8244-bde82ec4fdbe"
    }
   }
  },
  {
   "cell_type": "code",
   "source": [
    "qa_df = spark.createDataFrame(\n    [\n        (\n            \"Q: Where is the Grand Canyon?\\nA: The Grand Canyon is in Arizona.\\n\\nQ: What is the weight of the Burj Khalifa in kilograms?\\nA:\",\n        )\n    ]\n).toDF(\"prompt\")\n\ndisplay(completion.transform(qa_df))"
   ],
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "title": "",
     "showTitle": false,
     "inputWidgets": {},
     "nuid": "831abdac-4b6c-4b7d-b99a-43dcb75c4169"
    }
   },
   "outputs": [],
   "execution_count": 0
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  },
  "save_output": true,
  "kernelspec": {
   "name": "synapse_pyspark",
   "language": "Python",
   "display_name": "Synapse PySpark"
  },
  "synapse_widget": {
   "version": "0.1",
   "state": {
    "4bd0e60b-98ae-4bfe-98ee-6f0399ceb456": {
     "type": "Synapse.DataFrame",
     "sync_state": {
      "table": {
       "rows": [
        {
         "0": "Once upon a time",
         "1": [
          " there was a girl who had a dream of becoming a writer.\n\nShe started writing short stories"
         ]
        },
        {
         "0": "Hello my name is",
         "1": [
          "***** and I have a question about my cat\n\nHello, thank you for bringing your question to"
         ]
        },
        {
         "0": "The best code is code thats",
         "1": [
          " not there\n\nCommenting your code is important. Not only does it help you remember what you"
         ]
        }
       ],
       "schema": [
        {
         "key": "0",
         "name": "prompt",
         "type": "string"
        },
        {
         "key": "1",
         "name": "text",
         "type": "ArrayType(StringType,true)"
        }
       ],
       "truncated": false
      },
      "isSummary": false,
      "language": "scala"
     },
     "persist_state": {
      "view": {
       "type": "details",
       "tableOptions": {},
       "chartOptions": {
        "seriesFieldKeys": [
         "0"
        ],
        "categoryFieldKeys": [
         "0"
        ],
        "isStacked": false,
        "aggregationType": "count",
        "chartType": "bar"
       }
      }
     }
    }
   }
  },
  "kernel_info": {
   "name": "synapse_pyspark"
  },
  "application/vnd.databricks.v1+notebook": {
   "notebookName": "CognitiveServices - OpenAI",
   "dashboards": [],
   "notebookMetadata": {
    "pythonIndentUnit": 2
   },
   "language": "python",
   "widgets": {},
   "notebookOrigID": 446901528076027
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}